Cloud Composer で DAG パーサーのログを Cloud Logging に出力したい
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
今回の記事では、Cloud Composer で DAG パーサーのログを Cloud Logging に出力する方法について検証した内容をご紹介します。
やりたいこと
DAG のトップレベルコードで Airflow 変数にアクセスする際、そのアクセスがパースごとに発生していることを確認したいと考えています。
Cloud Logging を利用して、このプロセスを可視化し、DAG パース時のパフォーマンスに対する影響を評価することを目的としています。
経緯
以下のドキュメントをきっかけに、トップレベルのコードにおける Airflow 変数のアクセスがパースごとに発生していることを Cloud Logging のログで確認しようとしました。
しかし、ログに出力されなかったため、調査を進めた結果、dag_processor_log_target
を stdout
に設定することでログに出力されることがわかりました。
Using Airflow Variables yields network calls and database access, so their usage in top-level Python code for DAGs should be avoided as much as possible, as mentioned in the previous chapter, Top level Python Code. If Airflow Variables must be used in top-level DAG code, then their impact on DAG parsing can be mitigated by enabling the experimental cache, configured with a sensible ttl.
Airflow ベストプラクティスドキュメントによると、Airflow 変数をトップレベルのコードで使用することは、ネットワークアクセスやデータベースアクセスを伴うため、できる限り避けるべきだとされています。
dag_processor_log_target
とは
dag_processor_log_target
は、DAG パーサーが生成するログの出力先を指定するための Airflow の設定オプションです。
設定値の種類
file
: 特定のファイルにログを出力しますstdout
: 標準出力にログを出力します
この設定をstdout
にすることで、DAG のパース時に発生するログを Cloud Logging に送信することが可能になります。
環境作成
DAG を動かすための Cloud Composer 環境を作成します。
Google Cloud コンソールの「Cloud Composer の環境作成」ページから、test-composer
という名前の環境を、東京リージョンで作成しました。
サービスアカウントなどその他の設定はデフォルトのままで作成しました。
Airflow 変数を設定する
Airflow 変数を設定することで、タスク間でデータを共有し、DAG の動作をカスタマイズできます。
今回の検証用に Variable 変数を事前に設定しておきます。
以下のコマンドを使用して、my_variable
という変数に my_value を設定しています。
$ gcloud composer environments run test-composer \
--location asia-northeast1 \
variables -- set my_variable my_value
DAG を作成する
次に、DAG ファイルを作成します。
ここでは、トップレベルとタスク内でそれぞれ Airflow 変数にアクセスし、その旨がわかるログを出力する DAG を用意します。
これにより、DAG パース時とタスク実行時の両方で変数アクセスのログを確認できます。
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from datetime import datetime
import logging
def log_variable():
task_level_var = Variable.get("my_variable", default_var="default_value")
logging.info(f"Task-level variable access: {task_level_var}")
top_level_var = Variable.get("my_variable", default_var="default_value")
logging.info(f"Top-level variable access: {top_level_var}")
dag = DAG(
'variable_access_logging_dag_dag',
default_args={'start_date': datetime(2023, 1, 1)},
schedule_interval=None,
)
task = PythonOperator(
task_id='log_variable_task',
python_callable=log_variable,
dag=dag,
)
DAG を実行する
DAG を Cloud Composer にデプロイし、手動実行します。
これにより、DAG のパースプロセスが開始され、指定したログが出力されるはずです。
ログを確認してみましょう。
Cloud Composer のコンソール画面の [ログ] をクリックして、Cloud Logging の ログエクスプローラ画面を開きます。
DAG の Airflow 変数のアクセスログのみ表示させたいため、下記クエリでログをフィルタリングします。
resource.type="cloud_composer_environment"
severity=INFO
SEARCH("variable `access:`")
トップレベルとタスク内それぞれの Airflow 変数アクセスログが出力されていることが確認できました。
INFO 2024-11-09 12:05:16.279 Top-level variable access: my_value
INFO 2024-11-09 12:05:17.427 Task-level variable access: my_value
この時点では、DAG の実行時のログは確認できましたが、定期的に実行されるはずの DAG パース時のアクセスログは出力されていませんでした。
これは、DAG パーサーのログが標準出力に向けられていないためでした。
Airflow 構成オプションをオーバーライドする
Cloud Composer では、Airflow の構成オプションをオーバーライドすることが可能です。
dag_processor_log_target
を stdout
に設定することで、ログを標準出力に出力し、Cloud Logging に転送されるようにします。
以下のコマンドでオーバーライドします。
$ gcloud composer environments update test-composer \
--location asia-northeast1 \
--update-airflow-configs=logging-dag_processor_log_target=stdout
DAG パーサーのログを確認する
再び Cloud Logging の ログエクスプローラ画面を開き、DAG パーサーのログが出力されているかを確認します。
トップレベルの Airflow 変数アクセスログが複数出力されています。
INFO 2024-11-09 12:15:10.109 Top-level variable access: my_value
INFO 2024-11-09 12:15:43.904 Top-level variable access: my_value
INFO 2024-11-09 12:16:16.508 Top-level variable access: my_value
INFO 2024-11-09 12:16:48.514 Top-level variable access: my_value
INFO 2024-11-09 12:17:20.425 Top-level variable access: my_value
INFO 2024-11-09 12:17:52.194 Top-level variable access: my_value
INFO 2024-11-09 12:18:24.003 Top-level variable access: my_value
INFO 2024-11-09 12:18:55.419 Top-level variable access: my_value
INFO 2024-11-09 12:19:27.600 Top-level variable access: my_value
INFO 2024-11-09 12:19:59.300 Top-level variable access: my_value
ログを確認すると、約30秒間隔で定期的にトップレベルの変数アクセスが行われていることが確認できました。
これは、Cloud Composer の DAG パーサーが定期的に DAG ファイルをスキャンし、パースを実行していることを示しています。
まとめ
以上、Cloud Composer で DAG パーサーのログを Cloud Logging に出力する方法について解説しました。
今回の検証により、DAG パース時の変数アクセスの挙動を実際に確認することができました。
約30秒間隔でトップレベルコードでの変数アクセスが発生していることが確認でき、「トップレベルでの変数アクセスは避けるべき」という Airflow のベストプラクティスの根拠を実際のログで理解することができました。
DAG パーサーのログは、スケジューラプロセスによって生成されるため、スケジューラのログに含まれます。
これらのログには、DAG のパース中に発生したエラーや警告、情報メッセージが記録されます。
Cloud Logging でこれらのログを可視化することで、パフォーマンスの問題や異常な動作の早期発見、トラブルシューティングの効率化につながります。
みなさんのワークフロー管理にも取り入れてみてください!
参考
- Configuration Reference — Airflow Documentation
- Global environment variables | Astronomer Documentation
- Airflow コマンドライン インターフェースにアクセスする | Cloud Composer | Google Cloud
- Airflow 構成オプションをオーバーライドする | Cloud Composer | Google Cloud
- Best Practices — Airflow Documentation
- Cloud Composer で Airflow DAG 解析時間を短縮す | Google Cloud 公式ブログ